feat: multiturn, streaming, TaskRun tree#1107
Conversation
…_streaming.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…pecifics to caller
…dapter-add-streaming-2
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThreads prior_trace through adapters, adds MultiturnFormatter and initial_messages, introduces StreamingCompletion and AdapterStream for litellm streaming and tool-call orchestration, adds AiSdkStreamConverter for event conversion, extends BaseAdapter streaming surface and MCPAdapter guards, and adds extensive tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant BaseAdapter as BaseAdapter
participant ChatFormatter as ChatFormatter
participant AdapterStream as AdapterStream
participant LiteLLM as LiteLLM
participant ToolSystem as ToolSystem
Client->>BaseAdapter: invoke_openai_stream(input, prior_trace)
BaseAdapter->>ChatFormatter: build_chat_formatter(input, prior_trace)
ChatFormatter-->>BaseAdapter: initial_messages() (MultiturnFormatter)
BaseAdapter->>AdapterStream: _prepare_stream(initial_messages...)
Client->>AdapterStream: async iterate
loop turns
AdapterStream->>LiteLLM: acompletion(messages, stream=True)
LiteLLM-->>AdapterStream: ModelResponseStream chunks
AdapterStream->>AdapterStream: parse chunks, detect tool_call
alt tool_call detected
AdapterStream->>ToolSystem: call tool(arguments)
ToolSystem-->>AdapterStream: tool result
AdapterStream-->>Client: ToolCallEvent (OUTPUT_AVAILABLE)
AdapterStream->>LiteLLM: acompletion(messages + tool_result)
end
AdapterStream-->>Client: AdapterStreamEvent (text/reasoning delta)
end
AdapterStream->>BaseAdapter: finalize -> AdapterStreamResult
BaseAdapter-->>Client: TaskRun
sequenceDiagram
participant Client as Client
participant AiSdkConverter as AiSdkStreamConverter
participant LiteLLM as LiteLLM
Client->>AiSdkConverter: new()
loop for each chunk
LiteLLM-->>Client: ModelResponseStream (delta)
Client->>AiSdkConverter: convert_chunk(chunk)
AiSdkConverter-->>Client: AiSdkStreamEvent (TEXT/REASONING/TOOL events)
end
Client->>AiSdkConverter: finalize()
AiSdkConverter-->>Client: FINISH event (usage/finishReason)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the interaction capabilities with AI models by introducing comprehensive streaming and multi-turn conversation support. It allows users to maintain conversation history across multiple interactions and receive model responses in real-time streams, aligning with both OpenAI and AI SDK standards. This refactoring provides a more dynamic and responsive user experience, particularly for complex tasks involving tool use and extended dialogues. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📊 Coverage ReportOverall Coverage: 91% Diff: origin/main...HEAD
Summary
Line-by-lineView line-by-line diff coveragelibs/core/kiln_ai/adapters/chat/chat_formatter.pyLines 284-297 284 return ChatTurn(messages=[user_msg], final_call=True)
285
286 if self._state == "awaiting_final":
287 if previous_output is None:
! 288 raise ValueError("previous_output required for final step")
289 self._messages.append(BasicChatMessage("assistant", previous_output))
290 self._state = "done"
291 return None
292
! 293 return None
294
295
296 def get_chat_formatter(
297 strategy: ChatStrategy,libs/core/kiln_ai/adapters/model_adapters/adapter_stream.pyLines 24-32 24 from kiln_ai.adapters.run_output import RunOutput
25 from kiln_ai.datamodel import Usage
26
27 if TYPE_CHECKING:
! 28 from kiln_ai.adapters.model_adapters.litellm_adapter import LiteLlmAdapter
29
30 MAX_CALLS_PER_TURN = 10
31 MAX_TOOL_CALLS_PER_TURN = 30Lines 74-82 74 "AdapterStream has not been iterated yet. "
75 "Use 'async for event in stream:' before accessing .result"
76 )
77 if self._result is None:
! 78 raise RuntimeError("AdapterStream completed without producing a result")
79 return self._result
80
81 async def __aiter__(self) -> AsyncIterator[AdapterStreamEvent]:
82 self._result = NoneLines 118-126 118 else:
119 yield event
120
121 if not prior_output:
! 122 raise RuntimeError("No assistant message/output returned from model")
123
124 logprobs = self._adapter._extract_and_validate_logprobs(final_choice)
125
126 intermediate_outputs = self._chat_formatter.intermediate_outputs()Lines 128-136 128 final_choice, intermediate_outputs
129 )
130
131 if not isinstance(prior_output, str):
! 132 raise RuntimeError(f"assistant message is not a string: {prior_output}")
133
134 trace = self._adapter.all_messages_to_trace(self._messages)
135 self._result = AdapterStreamResult(
136 run_output=RunOutput(Lines 198-206 198 usage=usage,
199 )
200 return
201
! 202 raise RuntimeError(
203 "Model returned neither content nor tool calls. It must return at least one of these."
204 )
205
206 raise RuntimeError(libs/core/kiln_ai/adapters/model_adapters/base_adapter.pyLines 64-72 64 from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
65 from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
66
67 if TYPE_CHECKING:
! 68 from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStream
69
70 SkillsDict = Dict[str, Skill]
71 Lines 326-334 326 input: InputType,
327 prior_trace: list[ChatCompletionMessageParam] | None,
328 ) -> AdapterStream:
329 if self.input_schema is not None:
! 330 validate_schema_with_value_error(
331 input,
332 self.input_schema,
333 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
334 require_object=False,Lines 338-347 338
339 formatted_input = input
340 formatter_id = self.model_provider().formatter
341 if formatter_id is not None:
! 342 formatter = request_formatter_from_id(formatter_id)
! 343 formatted_input = formatter.format_input(input)
344
345 return self._create_run_stream(formatted_input, prior_trace)
346
347 def _finalize_stream(libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.pyLines 266-283 266 self,
267 input: InputType,
268 prior_trace: list[ChatCompletionMessageParam] | None = None,
269 ) -> AdapterStream:
! 270 provider = self.model_provider()
! 271 if not provider.model_id:
! 272 raise ValueError("Model ID is required for OpenAI compatible models")
273
! 274 chat_formatter = self.build_chat_formatter(input, prior_trace)
! 275 initial_messages: list[ChatCompletionMessageIncludingLiteLLM] = copy.deepcopy(
276 chat_formatter.initial_messages()
277 )
278
! 279 return AdapterStream(
280 adapter=self,
281 provider=provider,
282 chat_formatter=chat_formatter,
283 initial_messages=initial_messages,libs/core/kiln_ai/adapters/model_adapters/mcp_adapter.pyLines 93-101 93 "Session continuation is not supported for MCP adapter. "
94 "MCP tools are single-turn and do not maintain conversation state."
95 )
96
! 97 run_output, _ = await self.invoke_returning_run_output(
98 input, input_source, prior_trace, parent_task_run
99 )
100 return run_outputlibs/core/kiln_ai/adapters/model_adapters/stream_events.pyLines 91-99 91 self._finish_reason = choice.finish_reason
92
93 delta = choice.delta
94 if delta is None:
! 95 continue
96
97 reasoning_content = getattr(delta, "reasoning_content", None)
98 if reasoning_content:
99 if not self._reasoning_started:libs/core/kiln_ai/datamodel/basemodel.pyLines 645-653 645 if parent_types_override is None:
646 # Default: single expected parent type — original behaviour
647 parent = cls.parent_type().load_from_file(parent_path)
648 if parent is None:
! 649 raise ValueError("Parent must be set to load children")
650 else:
651 # Polymorphic parent: read only the model_type field to avoid a full load.
652 with open(parent_path, "r", encoding="utf-8") as fh:
653 actual_parent_type_name = json.loads(fh.read()).get("model_type", "")Lines 664-672 664 if t.type_name() == actual_parent_type_name
665 )
666 parent = parent_type.load_from_file(parent_path)
667 if parent is None:
! 668 raise ValueError("Parent must be set to load children")
669
670 # Ignore type error: this is abstract base class, but children must implement relationship_name
671 relationship_folder = parent_folder / Path(cls.relationship_name()) # type: ignorelibs/core/kiln_ai/datamodel/task_run.pyLines 159-167 159
160 if not isinstance(parent, TaskRun):
161 # the parent is not a TaskRun, but also not a Task, so it is not
162 # a real parent
! 163 return None
164
165 # the parent is a TaskRun, so we just walk up the tree until we find a Task
166 current = parentLines 180-188 180 return [Task, TaskRun]
181
182 def runs(self, readonly: bool = False) -> list["TaskRun"]:
183 """The list of child task runs."""
! 184 return super().runs(readonly=readonly) # type: ignore
185
186 def is_root_task_run(self) -> bool:
187 """Is this the root task run? (not nested under another task run)"""
188 # lazy import to avoid circular dependencyLines 232-240 232 loaded_parent_task = Task.load_from_file(task_path)
233 super().__setattr__("parent", loaded_parent_task)
234 return loaded_parent_task
235
! 236 return None
237
238 @model_validator(mode="after")
239 def check_parent_type(self) -> Self:
240 """Check that the parent is a Task or TaskRun. This overrides the default parent type check
|
There was a problem hiding this comment.
Code Review
This pull request introduces comprehensive streaming capabilities and session continuation for AI model interactions. Key changes include the addition of a StreamingCompletion class for async streaming of litellm responses and an AdapterStream to orchestrate multi-turn conversations and tool calls, yielding raw model response chunks and structured ToolCallEvents. A new stream_events.py module defines AiSdkEventType, AiSdkStreamEvent, and AiSdkStreamConverter to transform raw model and tool call events into a standardized AI SDK event format, supporting detailed streaming feedback. The BaseAdapter and LiteLlmAdapter are updated to support prior_trace (conversation history) for continuing multi-turn sessions, utilizing a new MultiturnFormatter. New invoke_openai_stream and invoke_ai_sdk_stream methods are added to BaseAdapter for streaming interactions. The MCPAdapter explicitly raises NotImplementedError for prior_trace, indicating that multi-turn session continuation is not supported for single-turn MCP tools. Extensive unit and paid integration tests have been added or updated to validate these new features, covering various scenarios including tool calls, error handling, and session continuation. Additionally, .gitignore is updated to exclude test_output/, and existing test mocks are adjusted to align with the new streaming architecture.
Note: Security Review did not run due to the size of the PR.
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
libs/core/kiln_ai/adapters/model_adapters/stream_events.py (1)
287-290: Consider resetting text/reasoning state inreset_for_next_step.Currently
reset_for_next_steponly clears_tool_calls_stateand_finish_reason. If a model turn ends with active reasoning or text (without an explicit end event), and then tool calls occur followed by a new model turn, the_text_startedand_reasoning_startedflags remain set. This could cause issues if the next model turn starts emitting new text or reasoning without proper START events.However, looking at the usage in
base_adapter.py(Context snippet 1),reset_for_next_stepis called when transitioning from aToolCallEventto a newModelResponseStream, which typically represents a new model response after tool execution. At this point, the previous model turn should have completed its text/reasoning blocks naturally.Consider also resetting text/reasoning state for robustness
def reset_for_next_step(self) -> None: """Reset per-step state between LLM calls in a multi-step flow.""" self._tool_calls_state = {} self._finish_reason = None + # Reset text/reasoning state for new model turn + self._text_started = False + self._reasoning_started = False🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py` around lines 287 - 290, reset_for_next_step currently only clears _tool_calls_state and _finish_reason but leaves _text_started and _reasoning_started set, which can cause the next model turn to misinterpret new text/reasoning emits; update reset_for_next_step in stream_events.py to also reset the per-turn text/reasoning state by clearing _text_started and _reasoning_started (and any associated per-turn buffers such as _current_text_block/_current_reasoning_block if present) so a new ModelResponseStream begins with a clean slate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libs/core/kiln_ai/adapters/chat/chat_formatter.py`:
- Around line 273-283: initial_messages() returns the prior trace but _messages
is never populated from _prior_trace, so messages() and message_dicts() only
include new turns; ensure formatter-owned state preserves the prior trace by
seeding _messages from _prior_trace when starting the conversation. Modify
next_turn (the branch where self._state == "start") to initialize or extend
self._messages with list(self._prior_trace) before appending the new
BasicChatMessage, so the existing prior trace is retained for subsequent
messages() / message_dicts() calls (references: initial_messages, next_turn,
_prior_trace, _messages, messages(), message_dicts()).
In `@libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py`:
- Around line 54-68: The issue is that AdapterStream reuses the same mutable
_messages list across iterations, causing duplicated conversation traces; fix by
storing the original initial_messages in a new attribute (e.g.,
_initial_messages) inside __init__ and on __aiter__ reset self._messages =
list(self._initial_messages) (or a shallow copy) in addition to clearing
self._result and self._iterated so each iteration starts with a fresh message
buffer; update both __init__ (add _initial_messages) and __aiter__ (reset
_messages) accordingly.
- Around line 178-191: The code is including synthetic "task_response" entries
in the tool call execution and counting, allowing synthetic items to be executed
and bypass MAX_TOOL_CALLS_PER_TURN; update the logic around
self._handle_tool_calls, self._extract_task_response and tool_calls_count so you
first separate/filter out any synthetic task_response (use the presence/format
returned by _extract_task_response or an explicit flag) from the list of
tool_calls, call process/iterate only over the real tool calls, and increment
tool_calls_count by the number of real tool calls processed (not counting the
synthetic task_response); apply the same filtering/counting fix to the later
duplicate block (around lines 214-236) so both paths exclude task_response from
execution and from the MAX_TOOL_CALLS_PER_TURN accounting.
In `@libs/server/kiln_server/test_run_api.py`:
- Around line 1757-1785: The _assert_math_tools_response helper currently allows
false positives; update it to (1) assert that the assistant's final content
includes the exact bracketed final answer pattern (e.g., contains "[" +
expected_in_output + "]"), (2) assert that the most recent user message in trace
matches the current user prompt (verify by finding the last message with role
"user" and checking its content equals expected_in_output or another provided
prompt string), and (3) tighten tool call counts by enforcing scenario-specific
minima: require at least 2 tool-calling assistant messages for multi-step cases
(use assistant_with_tool_calls length) and at least 1 tool message for simple
cases, or accept a passed-in minimum count parameter; also keep existing checks
for output, tool messages, last_assistant, and that intermediate_outputs values
are strings. Reference: function _assert_math_tools_response, variables output,
expected_in_output, trace, assistant_with_tool_calls, tool_messages,
last_assistant, intermediate_outputs.
- Around line 1686-1729: The fixture adapter_sanity_check_setup should stop
using the hardcoded /Users/.../Downloads path and instead default to using the
pytest-provided tmp_path; update adapter_sanity_check_setup to construct
project_path under tmp_path (e.g., tmp_path / "adapter_sanity_project" /
"project.kiln"), create parent dirs as needed, and create/load Project and Task
the same way; add support for an opt-in environment variable (e.g.,
KILN_USE_PERSISTENT_ADAPTER_SANITY_PATH) that, when set, uses the original
persistent location for manual runs; ensure you still update
Config.shared()._settings["projects"] and restore original_projects in the
teardown exactly as before.
---
Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py`:
- Around line 287-290: reset_for_next_step currently only clears
_tool_calls_state and _finish_reason but leaves _text_started and
_reasoning_started set, which can cause the next model turn to misinterpret new
text/reasoning emits; update reset_for_next_step in stream_events.py to also
reset the per-turn text/reasoning state by clearing _text_started and
_reasoning_started (and any associated per-turn buffers such as
_current_text_block/_current_reasoning_block if present) so a new
ModelResponseStream begins with a clean slate.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: b95fd129-637e-46f9-bf66-5f387475d44d
📒 Files selected for processing (25)
.gitignorelibs/core/kiln_ai/adapters/chat/__init__.pylibs/core/kiln_ai/adapters/chat/chat_formatter.pylibs/core/kiln_ai/adapters/chat/test_chat_formatter.pylibs/core/kiln_ai/adapters/litellm_utils/__init__.pylibs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.pylibs/core/kiln_ai/adapters/litellm_utils/test_litellm_streaming.pylibs/core/kiln_ai/adapters/model_adapters/adapter_stream.pylibs/core/kiln_ai/adapters/model_adapters/base_adapter.pylibs/core/kiln_ai/adapters/model_adapters/litellm_adapter.pylibs/core/kiln_ai/adapters/model_adapters/mcp_adapter.pylibs/core/kiln_ai/adapters/model_adapters/stream_events.pylibs/core/kiln_ai/adapters/model_adapters/test_adapter_stream.pylibs/core/kiln_ai/adapters/model_adapters/test_base_adapter.pylibs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.pylibs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.pylibs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_tools.pylibs/core/kiln_ai/adapters/model_adapters/test_mcp_adapter.pylibs/core/kiln_ai/adapters/model_adapters/test_saving_adapter_results.pylibs/core/kiln_ai/adapters/model_adapters/test_stream_events.pylibs/core/kiln_ai/adapters/model_adapters/test_structured_output.pylibs/core/kiln_ai/adapters/test_prompt_adaptors.pylibs/core/kiln_ai/adapters/test_prompt_builders.pylibs/core/kiln_ai/datamodel/test_basemodel.pylibs/server/kiln_server/test_run_api.py
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py (1)
355-367: Make the continuation assertion proveprior_tracewas used.Right now this only checks that the second call streamed something, which still passes if
prior_traceis accidentally dropped and the model answers generically. Aggregate the continuation text and assert it references the first turn’s result so the test actually guards the multi-turn path.Suggested assertion
continuation_chunks: list[litellm.ModelResponseStream] = [] + continuation_text_parts: list[str] = [] async for chunk in adapter.invoke_openai_stream( input="What was the result? Reply in one short sentence.", prior_trace=initial_run.trace, ): continuation_chunks.append(chunk) + if chunk.choices: + delta = chunk.choices[0].delta + if delta is not None and delta.content is not None: + continuation_text_parts.append(delta.content) _dump_paid_test_output(request, continuation_chunks=continuation_chunks) assert len(continuation_chunks) > 0, "No continuation chunks collected" + assert "444" in "".join(continuation_text_parts), ( + "Continuation response did not reflect the previous turn" + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py` around lines 355 - 367, The test only checks that adapter.invoke_openai_stream produced continuation_chunks but not that prior_trace was honored; aggregate the streamed text from continuation_chunks (e.g., join the chunk text fields) and assert the aggregated continuation references the first turn's result from initial_run (for the "123 + 321 = ?" prompt assert the continuation contains "444" or otherwise includes the value/text from initial_run.trace), using adapter.invoke_openai_stream, initial_run, prior_trace and continuation_chunks to locate where to change the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py`:
- Line 84: _self._usage_data is currently overwritten by each tool step so
finalize() reports only the last turn; change the logic that assigns to
_usage_data to merge/accumulate usage instead: when a new usage dict/object
arrives (where code currently does self._usage_data = ... in the stream event
handlers), if self._usage_data is None set it to the new usage, otherwise sum
numeric token fields (e.g., prompt_tokens, completion_tokens, total_tokens) and
merge any other keys (prefer summing counts and keeping latest non-numeric
values) into the existing self._usage_data; ensure finalize() then reads the
accumulated totals. Update all places that set _usage_data (the spots handling
model usage events and where _usage_data is currently assigned) to use this
merge/accumulate logic and keep the attribute type consistent.
- Around line 250-295: In finalize(), emit a matching finish-step event before
the terminal FINISH to complete the start/start-step lifecycle: add
events.append(AiSdkStreamEvent(AiSdkEventType.FINISH_STEP)) (or include the
appropriate step id/payload if you track one) immediately before the block that
appends AiSdkEventType.FINISH so the AiSdkStreamConverter produces the
corresponding FINISH_STEP event to mirror BaseAdapter's start-step; update
finalize() (and use AiSdkStreamEvent and AiSdkEventType.FINISH_STEP)
accordingly.
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 148-153: The loop over chunks dereferences chunk.choices[0]
without ensuring choices exists; update the loop (the "for chunk in chunks:"
block) to skip chunks with no choices first (e.g., if not chunk.choices:
continue) before checking finish_reason or accessing delta, so you never access
choices[0] on an empty/usage-only chunk.
---
Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py`:
- Around line 355-367: The test only checks that adapter.invoke_openai_stream
produced continuation_chunks but not that prior_trace was honored; aggregate the
streamed text from continuation_chunks (e.g., join the chunk text fields) and
assert the aggregated continuation references the first turn's result from
initial_run (for the "123 + 321 = ?" prompt assert the continuation contains
"444" or otherwise includes the value/text from initial_run.trace), using
adapter.invoke_openai_stream, initial_run, prior_trace and continuation_chunks
to locate where to change the test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 56fbfe5d-c3d8-47e8-87b0-73664216ad4f
📒 Files selected for processing (3)
libs/core/kiln_ai/adapters/model_adapters/stream_events.pylibs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.pylibs/core/kiln_ai/adapters/model_adapters/test_stream_events.py
🚧 Files skipped from review as they are similar to previous changes (1)
- libs/core/kiln_ai/adapters/model_adapters/test_stream_events.py
| self._reasoning_block_count = 0 | ||
| self._tool_calls_state: dict[int, dict[str, Any]] = {} | ||
| self._finish_reason: str | None = None | ||
| self._usage_data: Any = None |
There was a problem hiding this comment.
Aggregate usage across tool steps instead of overwriting it.
The same converter instance spans the whole streamed run, but _usage_data only keeps one usage object. In a tool loop, later model turns overwrite earlier ones, so finalize() reports only the last turn’s token counts instead of the total for the stream.
Possible fix
@@
- self._usage_data: Any = None
+ self._usage_totals = {
+ "prompt_tokens": 0,
+ "completion_tokens": 0,
+ "total_tokens": 0,
+ }
+ self._saw_usage = False
@@
- if not chunk.choices:
- usage = getattr(chunk, "usage", None)
- if usage is not None:
- self._usage_data = usage
+ usage = getattr(chunk, "usage", None)
+ if usage is not None:
+ self._saw_usage = True
+ self._usage_totals["prompt_tokens"] += (
+ getattr(usage, "prompt_tokens", 0) or 0
+ )
+ self._usage_totals["completion_tokens"] += (
+ getattr(usage, "completion_tokens", 0) or 0
+ )
+ self._usage_totals["total_tokens"] += (
+ getattr(usage, "total_tokens", 0) or 0
+ )
@@
- if self._usage_data is not None:
+ if self._saw_usage:
usage_payload: dict[str, Any] = {
- "promptTokens": self._usage_data.prompt_tokens,
- "completionTokens": self._usage_data.completion_tokens,
+ "promptTokens": self._usage_totals["prompt_tokens"],
+ "completionTokens": self._usage_totals["completion_tokens"],
}
- total = getattr(self._usage_data, "total_tokens", None)
- if total is not None:
- usage_payload["totalTokens"] = total
+ if self._usage_totals["total_tokens"]:
+ usage_payload["totalTokens"] = self._usage_totals["total_tokens"]Also applies to: 206-209, 275-283
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@libs/core/kiln_ai/adapters/model_adapters/stream_events.py` at line 84,
_self._usage_data is currently overwritten by each tool step so finalize()
reports only the last turn; change the logic that assigns to _usage_data to
merge/accumulate usage instead: when a new usage dict/object arrives (where code
currently does self._usage_data = ... in the stream event handlers), if
self._usage_data is None set it to the new usage, otherwise sum numeric token
fields (e.g., prompt_tokens, completion_tokens, total_tokens) and merge any
other keys (prefer summing counts and keeping latest non-numeric values) into
the existing self._usage_data; ensure finalize() then reads the accumulated
totals. Update all places that set _usage_data (the spots handling model usage
events and where _usage_data is currently assigned) to use this merge/accumulate
logic and keep the attribute type consistent.
There was a problem hiding this comment.
P2 - probably should do, but the typing is a bit unreliable right now so adding that will take a fair amount of testing
There was a problem hiding this comment.
@leonardmq, understood — the loosely-typed usage object returned by litellm across different providers does make safe accumulation tricky without thorough integration testing. Would you like me to open a follow-up GitHub issue to track the usage aggregation across tool steps so it doesn't get lost?
🧠 Learnings used
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 1107
File: libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py:54-68
Timestamp: 2026-03-16T19:18:44.984Z
Learning: In Kiln-AI/Kiln, `AdapterStream` (in `libs/core/kiln_ai/adapters/model_adapters/adapter_stream.py`) is always constructed fresh per call via `_create_run_stream` in `litellm_adapter.py`. It is never reused or re-iterated; a new instance is created for every `invoke_openai_stream`/`invoke_ai_sdk_stream` invocation through `_prepare_stream`. There is no need to reset `_messages` on re-iteration.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 418
File: libs/core/kiln_ai/adapters/ml_model_list.py:1875-1890
Timestamp: 2025-08-08T16:13:26.526Z
Learning: In libs/core/kiln_ai/adapters/ml_model_list.py (Python), do not blanket-add r1_thinking/optional_r1_thinking parsers for R1-style models. Parser usage is provider-specific and must be based on observed responses in tests. For PR Kiln-AI/Kiln#418, deepseek_r1_0528_distill_qwen3_8b providers were validated without needing a parser.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 546
File: app/web_ui/src/routes/(app)/docs/library/[project_id]/upload_file_dialog.svelte:107-120
Timestamp: 2025-09-10T08:32:18.688Z
Learning: leonardmq prefers to work within the constraints of their SDK codegen for API calls, even when typing is awkward (like casting FormData to match expected types), rather than using alternative approaches like native fetch that would cause compiler errors with their generated types.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 413
File: libs/core/kiln_ai/datamodel/chunk.py:138-160
Timestamp: 2025-08-22T11:17:56.862Z
Learning: leonardmq prefers to avoid redundant validation checks when upstream systems already guarantee preconditions are met. He trusts the attachment system to ensure paths are properly formatted and prefers letting the attachment method (resolve_path) handle any edge cases rather than adding defensive precondition checks.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 390
File: libs/core/kiln_ai/adapters/provider_tools.py:494-499
Timestamp: 2025-07-23T08:58:45.769Z
Learning: leonardmq prefers to keep tightly coupled implementation details (like API versions) hardcoded when they are not user-facing and could break other modules if changed. The shared Config class is reserved for user-facing customizable values, not internal implementation details.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 341
File: libs/server/kiln_server/document_api.py:44-51
Timestamp: 2025-06-18T08:22:58.510Z
Learning: leonardmq prefers to defer fixing blocking I/O in async handlers when: the operation is very fast (milliseconds), user-triggered rather than automated, has no concurrency concerns, and would require additional testing to fix properly. He acknowledges such issues as valid but makes pragmatic decisions about timing the fixes.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 487
File: libs/core/kiln_ai/datamodel/rag.py:33-35
Timestamp: 2025-09-04T06:45:44.212Z
Learning: leonardmq requires vector_store_config_id to be a mandatory field in RagConfig (similar to extractor_config_id, chunker_config_id, embedding_config_id) for consistency. He prefers fixing dependent code that breaks due to missing required fields rather than making fields optional to accommodate incomplete data.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 654
File: app/web_ui/src/routes/(app)/docs/rag_configs/[project_id]/create_rag_config/edit_rag_config_form.svelte:141-152
Timestamp: 2025-09-25T06:38:14.854Z
Learning: leonardmq prefers simple onMount initialization patterns over reactive statements when possible, and is cautious about maintaining internal state for idempotency in Svelte components. He values simplicity and safety in component lifecycle management.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 402
File: libs/core/kiln_ai/adapters/embedding/litellm_embedding_adapter.py:0-0
Timestamp: 2025-07-14T03:43:07.283Z
Learning: leonardmq prefers to keep defensive validation checks even when they're technically redundant, viewing them as useful "quick sanity checks" that provide additional safety nets. He values defensive programming over strict DRY (Don't Repeat Yourself) principles when the redundant code serves as a safeguard.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 546
File: app/web_ui/src/lib/api_schema.d.ts:0-0
Timestamp: 2025-09-10T08:27:47.227Z
Learning: leonardmq correctly identifies auto-generated files and prefers not to manually edit them. When suggesting changes to generated TypeScript API schema files, the fix should be made in the source OpenAPI specification in the backend, not in the generated TypeScript file.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 487
File: libs/core/kiln_ai/adapters/vector_store/lancedb_adapter.py:136-150
Timestamp: 2025-09-04T06:34:12.318Z
Learning: leonardmq prefers brute force re-insertion of all chunks when partial chunks exist in the LanceDB vector store, rather than selectively inserting only missing chunks. His reasoning is that documents typically have only dozens of chunks (making overwriting cheap), and partial insertion likely indicates data corruption or conflicts that should be resolved by re-inserting all chunks to ensure consistency.
Learnt from: leonardmq
Repo: Kiln-AI/Kiln PR: 487
File: libs/core/kiln_ai/adapters/vector_store/lancedb_adapter.py:98-103
Timestamp: 2025-09-04T07:08:04.248Z
Learning: leonardmq prefers to let TableNotFoundError bubble up in delete_nodes_by_document_id operations rather than catching it, as this error indicates operations are being done in the wrong order on the caller side and should be surfaced as a programming error rather than handled gracefully.
libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter_streaming.py
Show resolved
Hide resolved
…eat-stream-multiturn-ai-sdk-openai-protocols
There was a problem hiding this comment.
🧹 Nitpick comments (3)
libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py (2)
493-496: Assert thatprior_traceis forwarded explicitly.
call_args[1].get("prior_trace") is Nonealso passes when the kwarg is omitted entirely, so this test can miss the regression it is meant to catch.🧪 Tighten the assertion
- assert adapter._run.call_args[1].get("prior_trace") is None + assert "prior_trace" in adapter._run.call_args.kwargs + assert adapter._run.call_args.kwargs["prior_trace"] is None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py` around lines 493 - 496, The test currently checks adapter._run.call_args[1].get("prior_trace") is None which doesn't fail if the kwarg is omitted; update the assertion to assert that the prior_trace kwarg is present and explicitly None by checking the call keyword dict contains the key and its value is None (i.e., inspect adapter._run.call_args[1] for the "prior_trace" key and assert adapter._run.call_args[1]["prior_trace"] is None) so the test for adapter.invoke and adapter._run will catch regressions where prior_trace is not forwarded.
1060-1239: Add one streaming test that threadsprior_tracethrough.The new multiturn assertions only cover
invoke/invoke_returning_run_output. A regression whereinvoke_openai_streamorinvoke_ai_sdk_streamdrops conversation history would still pass this suite.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py` around lines 1060 - 1239, Add a streaming test that verifies prior_trace is threaded through invoke_openai_stream and invoke_ai_sdk_stream: create a stream_adapter whose base_task includes a non-empty prior_trace, patch stream_adapter._prepare_stream to return a FakeAdapterStream (yielding one chunk) and patch _finalize_stream to return a TaskRun-like object; call invoke_openai_stream (and a separate test for invoke_ai_sdk_stream), iterate the stream to completion, and assert that the TaskRun or the object captured in the patched _prepare_stream/_finalize_stream contains the same prior_trace from the base task (use symbols stream_adapter, _prepare_stream, _finalize_stream, invoke_openai_stream, invoke_ai_sdk_stream, TaskRun) so the test fails if conversation history is dropped.libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py (1)
1485-1599: Also assert that the caller'sprior_tracestays untouched.The production path deep-copies seeded messages specifically because LiteLLM mutates message objects. Adding a regression assertion here would protect that guarantee for the nested tool-call trace as well.
🧪 Suggested assertion
+ original_prior_trace = json.loads(json.dumps(prior_trace)) run_output, _ = await adapter._run("what else?", prior_trace=prior_trace) + assert prior_trace == original_prior_trace🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py` around lines 1485 - 1599, The test test_run_with_prior_trace_preserves_tool_calls must also assert that the caller's prior_trace is not mutated by the adapter; after calling await adapter._run("what else?", prior_trace=prior_trace) add an assertion comparing the original prior_trace structure (e.g., checking specific nested fields like the first assistant tool_calls id "call_abc123" and subsequent tool response contents "28" and "172", or doing a deep-equality check) to ensure adapter._run (and its helper mock_run_model_turn) did not modify prior_trace; reference the prior_trace variable and the adapter._run/_run_model_turn symbols when adding this assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/test_base_adapter.py`:
- Around line 493-496: The test currently checks
adapter._run.call_args[1].get("prior_trace") is None which doesn't fail if the
kwarg is omitted; update the assertion to assert that the prior_trace kwarg is
present and explicitly None by checking the call keyword dict contains the key
and its value is None (i.e., inspect adapter._run.call_args[1] for the
"prior_trace" key and assert adapter._run.call_args[1]["prior_trace"] is None)
so the test for adapter.invoke and adapter._run will catch regressions where
prior_trace is not forwarded.
- Around line 1060-1239: Add a streaming test that verifies prior_trace is
threaded through invoke_openai_stream and invoke_ai_sdk_stream: create a
stream_adapter whose base_task includes a non-empty prior_trace, patch
stream_adapter._prepare_stream to return a FakeAdapterStream (yielding one
chunk) and patch _finalize_stream to return a TaskRun-like object; call
invoke_openai_stream (and a separate test for invoke_ai_sdk_stream), iterate the
stream to completion, and assert that the TaskRun or the object captured in the
patched _prepare_stream/_finalize_stream contains the same prior_trace from the
base task (use symbols stream_adapter, _prepare_stream, _finalize_stream,
invoke_openai_stream, invoke_ai_sdk_stream, TaskRun) so the test fails if
conversation history is dropped.
In `@libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py`:
- Around line 1485-1599: The test test_run_with_prior_trace_preserves_tool_calls
must also assert that the caller's prior_trace is not mutated by the adapter;
after calling await adapter._run("what else?", prior_trace=prior_trace) add an
assertion comparing the original prior_trace structure (e.g., checking specific
nested fields like the first assistant tool_calls id "call_abc123" and
subsequent tool response contents "28" and "172", or doing a deep-equality
check) to ensure adapter._run (and its helper mock_run_model_turn) did not
modify prior_trace; reference the prior_trace variable and the
adapter._run/_run_model_turn symbols when adding this assertion.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 6f85a09d-8db8-4c3f-92d8-aa3ef3bcbd84
📒 Files selected for processing (3)
libs/core/kiln_ai/adapters/model_adapters/litellm_adapter.pylibs/core/kiln_ai/adapters/model_adapters/test_base_adapter.pylibs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter.py
…eat-stream-multiturn-ai-sdk-openai-protocols
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
libs/core/kiln_ai/adapters/model_adapters/base_adapter.py (1)
152-241: Consolidate duplicated parse/validate/finalize logic across sync and streaming paths.The output parsing, schema validation, reasoning checks, and run-save flow are duplicated in two methods. Extracting a shared helper will reduce drift risk and simplify future changes.
Also applies to: 325-395
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/core/kiln_ai/adapters/model_adapters/base_adapter.py` around lines 152 - 241, The parsing/validation/reasoning and run-save logic duplicated in _run_returning_run_output and the streaming counterpart should be extracted into a single helper (e.g., _finalize_run or _postprocess_run) that accepts parsed_output, run_output, usage, input, input_source and returns (run, run_output) or just run; move the parse json-to-dict, validate_schema_with_value_error checks, reasoning-capable check, generate_run call, and the save vs clear-id logic (including run.save_to_file and run.id = None behavior) into that helper; update both _run_returning_run_output and the streaming method to call this helper after parsing so behavior and error messages remain identical.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libs/core/kiln_ai/adapters/model_adapters/base_adapter.py`:
- Around line 730-734: The FINISH_STEP event is emitted before the adapter's
finalization completes, causing clients to see completion prior to possible
errors; change the sequence so that self._task_run =
self._adapter._finalize_stream(adapter_stream, self._input, self._input_source)
is executed first and only after it returns successfully emit yield
AiSdkStreamEvent(AiSdkEventType.FINISH_STEP); this ensures _finalize_stream's
parsing/validation/save completes before AiSdkEventType.FINISH_STEP is sent and
preserves current variable usage of _task_run, adapter_stream, self._input, and
self._input_source.
---
Nitpick comments:
In `@libs/core/kiln_ai/adapters/model_adapters/base_adapter.py`:
- Around line 152-241: The parsing/validation/reasoning and run-save logic
duplicated in _run_returning_run_output and the streaming counterpart should be
extracted into a single helper (e.g., _finalize_run or _postprocess_run) that
accepts parsed_output, run_output, usage, input, input_source and returns (run,
run_output) or just run; move the parse json-to-dict,
validate_schema_with_value_error checks, reasoning-capable check, generate_run
call, and the save vs clear-id logic (including run.save_to_file and run.id =
None behavior) into that helper; update both _run_returning_run_output and the
streaming method to call this helper after parsing so behavior and error
messages remain identical.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: b475c187-7a11-4d91-92da-86a7276afa2c
📒 Files selected for processing (1)
libs/core/kiln_ai/adapters/model_adapters/base_adapter.py
…tocols' of github.com:Kiln-AI/Kiln into leonard/kil-461-feat-nesting-task-runs-into-each-other
…sk-runs-into-each-other feat: support nesting task runs into each other
…eat-stream-multiturn-ai-sdk-openai-protocols
…tocols' of github.com:Kiln-AI/Kiln into leonard/kil-447-feat-stream-multiturn-ai-sdk-openai-protocols
…eat-stream-multiturn-ai-sdk-openai-protocols
What does this PR do?
This PR adds support for:
traceto continue the conversationDemo here: Kiln-AI/ai-sdk-python-streaming#1
Checklists
Summary by CodeRabbit
New Features
Bug Fixes / Behavior
Tests
Chores